/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */
package org.unidata.mdm.dq.data.service.segments.records.upsert;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.type.timeline.TimeInterval;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.data.type.data.EtalonRecord;
import org.unidata.mdm.data.type.data.OriginRecord;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.dq.core.context.DataQualityContext;
import org.unidata.mdm.dq.core.dto.DataQualityResult;
import org.unidata.mdm.dq.core.service.DataQualityService;
import org.unidata.mdm.dq.core.type.model.instance.NamespaceAssignmentElement;
import org.unidata.mdm.dq.data.context.RecordQualityContext;
import org.unidata.mdm.dq.data.module.DataQualityDataModule;
import org.unidata.mdm.search.type.id.AbstractManagedIndexId;
import org.unidata.mdm.system.type.pipeline.Point;
import org.unidata.mdm.system.type.pipeline.Start;

/**
 * @author Mikhail Mikhailov on Apr 9, 2021
 * DQ processing point
 */
@Component(RecordUpsertQualityPointExecutor.SEGMENT_ID)
public class RecordUpsertQualityPointExecutor extends Point<RecordQualityContext> {
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataQualityDataModule.MODULE_ID + "[RECORD_UPSERT_QUALITY_POINT]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataQualityDataModule.MODULE_ID + ".record.upsert.quality.point.description";
    /**
     * DQS.
     */
    @Autowired
    private DataQualityService dataQualityService;
    /**
     * Constructor.
     */
    public RecordUpsertQualityPointExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void point(RecordQualityContext ctx) {

        Map<TimeInterval<OriginRecord>, DataQualityResult> collected = null;

        Timeline<OriginRecord> next = ctx.nextTimeline();
        if (next != null && !next.isEmpty() && ctx.hasAssignment()) {

            collected = new HashMap<>();
            NamespaceAssignmentElement nae = ctx.getAssignment();
            RecordKeys keys = next.getKeys();

            for (TimeInterval<OriginRecord> ti : next) {

                EtalonRecord er = ti.getCalculationResult();
                if (Objects.nonNull(er)) {

                    // We use index period id here for identification purposes,
                    // but this can be anything else.
                    String periodIdAsString = AbstractManagedIndexId.periodIdValToString(ti.getPeriodId());

                    DataQualityResult r = dataQualityService.apply(DataQualityContext.builder()
                            .payload(ctx.getPayload())
                            .mappings(nae.getAsssigned(keys.getEntityName()))
                            .sourceSystem(keys.getOriginKey().getSourceSystem())
                            .input(nae.getNameSpace(), er.getInfoSection().getEntityName(), periodIdAsString, er)
                            .build());

                    if (r.isValid() && !r.isEnriched()) {
                        continue;
                    }

                    collected.put(ti, r);
                }
            }
        }

        ctx.qualityResult(collected);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean supports(Start<?, ?> start) {
        return RecordQualityContext.class.isAssignableFrom(start.getInputTypeClass());
    }
}
